In this lab we will use denoising to clean up some noisy recordings. We will use the following sounds:
Speech in a room: [https://drive.google.com/uc?export=download&id=1g5nbBpNbCjqs8cSoW0Lb-owe15tswDwW ]
ATC: [https://drive.google.com/uc?export=download&id=1fh5iL5qRj3-PUP_sG5-QE5DofACY_Hr5 ]
Speech with wind: [https://drive.google.com/uc?export=download&id=1g7TIkLpnWDFNUQYgCqIUoSOkk5EIK16_ ]
Let’s start by cleaning up these sounds as best as we can. We will do a straightforward magnitude spectral subtraction. For all of these sounds there is only noise in the first few seconds of the recording so that you can learn a noise profile from there. Do the following: Perform an STFT of the recordings
Estimate the magnitude spectrum of the noise from the beginning of the recording
Perform spectral subtraction by subtracting that spectrum from the input’s magnitude STFT
Use the original signal’s phase to convert back to a time series.
Make a note of which examples sound the best and are easier to work with. Try to explain why.
At this point some of the outputs will exhibit “musical noise”. To minimize its effects apply a median filter on the denoised magnitude spectrogram to make it sound better (hint: scipy.signal.medfilt2). How big should the median window be? Try different values and find which work best.
import numpy as np
import matplotlib.pyplot as plt
from scipy.io.wavfile import read
from scipy import signal
from copy import deepcopy
import math
def sound( x, rate=8000, label=''):
from IPython.display import display, Audio, HTML
if label is '':
display( Audio( x, rate=rate))
else:
display( HTML(
'<style> table, th, td {border: 0px; }</style> <table><tr><td>' + label +
'</td><td>' + Audio( x, rate=rate)._repr_html_()[3:] + '</td></tr></table>'
))
def stft(input_sound, dft_size, hop_size, zero_pad, window):
result = []
for i in range(0, len(input_sound), hop_size):
slot = []
start = i
end = i + dft_size
if end >= len(input_sound):
slot = input_sound[start:]
zero = np.array([0] * (end - len(input_sound)))
slot = np.concatenate([slot, zero])
else:
slot = input_sound[start:end]
result.append(np.multiply(np.fft.rfft(np.concatenate([np.multiply(slot, window), np.array(zero_pad)])), (hop_size * 2 / dft_size)))
return result
def show_graph(sub_fig, freq_data, amp_data_len, sample_rate, mode="spectrogram", title=""):
if mode == "spectrogram":
transposed_data = np.log(np.absolute(freq_data).real).T
t1 = np.linspace(0, amp_data_len / sample_rate, len(transposed_data[0]))
t2 = np.linspace(0, sample_rate / 2, len(transposed_data))
x1, x2 = np.meshgrid(t1, t2)
sub_fig.pcolormesh(x1, x2, np.array(transposed_data))
sub_fig.set_title(title)
def get_bg_amp(input_sound, dft_size, hop_size, zero_pad, window):
spectrogram = np.abs(stft(input_sound, dft_size, hop_size, zero_pad, window))
return np.mean(spectrogram, axis=0)
def spec_subtract(alpha, med_kern_size, bg_amp, spectrogram):
result = np.zeros(np.array(spectrogram).shape, dtype=complex)
multiplier = np.zeros(np.array(spectrogram).shape, dtype=float)
for j in range(len(spectrogram)):
for i in range(len(spectrogram[j])):
multiplier[j][i] = max((abs(spectrogram[j][i]) - abs(bg_amp[i]) * alpha) / abs(spectrogram[j][i]), 0.0)
result = spectrogram * np.add(signal.medfilt2d(multiplier, kernel_size=med_kern_size), 0.00000001)
return result
def istft(stft_output, dft_size, hop_size, zero_pad, window):
# YOUR CODE HERE
result = np.array([0] * (hop_size * len(stft_output) - hop_size + dft_size), dtype=float)
for i in range(len(stft_output)):
each_wave = np.fft.irfft(stft_output[i], n=dft_size + len(zero_pad))[:dft_size]
result[(i * hop_size):(i * hop_size + dft_size)] = np.add(result[(i * hop_size):(i * hop_size + dft_size)], np.multiply(np.multiply(each_wave, window), (0.2)))
return result[:hop_size * len(stft_output)]
sr_ws, data_ws = read("wind-speech.wav")
sr_ac, data_ac = read("aircomm.wav")
sr_rs, data_rs = read("room-speech.wav")
dft_size = 512
hop_size = 128
window = window = signal.hann(dft_size)
zero_pad = []
spec_ws = stft(data_ws, dft_size, hop_size, zero_pad, window);
spec_ac = stft(data_ac, dft_size, hop_size, zero_pad, window);
spec_rs = stft(data_rs, dft_size, hop_size, zero_pad, window);
bg_ws = data_ws[:int(sr_ws * 2)]
bg_ac = data_ac[:int(sr_ac * 0.4)]
bg_rs = data_rs[:int(sr_rs * 2)]
bg_amp_ws = get_bg_amp(bg_ws, dft_size, hop_size, zero_pad, window)
bg_amp_ac = get_bg_amp(bg_ac, dft_size, hop_size, zero_pad, window)
bg_amp_rs = get_bg_amp(bg_rs, dft_size, hop_size, zero_pad, window)
spec_ws_denoised = spec_subtract(1.5, 1, bg_amp_ws, spec_ws)
spec_ac_denoised = spec_subtract(2.0, 1, bg_amp_ac, spec_ac)
spec_rs_denoised = spec_subtract(1.5, 1, bg_amp_rs, spec_rs)
spec_ws_denoised_smooth_3 = spec_subtract(1.5, 3, bg_amp_ws, spec_ws)
spec_ac_denoised_smooth_3 = spec_subtract(2.0, 3, bg_amp_ac, spec_ac)
spec_rs_denoised_smooth_3 = spec_subtract(1.5, 3, bg_amp_rs, spec_rs)
spec_ws_denoised_smooth_5 = spec_subtract(1.5, 5, bg_amp_ws, spec_ws)
spec_ac_denoised_smooth_5 = spec_subtract(2.0, 5, bg_amp_ac, spec_ac)
spec_rs_denoised_smooth_5 = spec_subtract(1.5, 5, bg_amp_rs, spec_rs)
data_ws_denoised = istft(spec_ws_denoised, dft_size, hop_size, zero_pad, window)
data_ac_denoised = istft(spec_ac_denoised, dft_size, hop_size, zero_pad, window)
data_rs_denoised = istft(spec_rs_denoised, dft_size, hop_size, zero_pad, window)
data_ws_denoised_smooth_3 = istft(spec_ws_denoised_smooth_3, dft_size, hop_size, zero_pad, window)
data_ac_denoised_smooth_3 = istft(spec_ac_denoised_smooth_3, dft_size, hop_size, zero_pad, window)
data_rs_denoised_smooth_3 = istft(spec_rs_denoised_smooth_3, dft_size, hop_size, zero_pad, window)
data_ws_denoised_smooth_5 = istft(spec_ws_denoised_smooth_5, dft_size, hop_size, zero_pad, window)
data_ac_denoised_smooth_5 = istft(spec_ac_denoised_smooth_5, dft_size, hop_size, zero_pad, window)
data_rs_denoised_smooth_5 = istft(spec_rs_denoised_smooth_5, dft_size, hop_size, zero_pad, window)
print("wind speech:")
sound(data_ws, rate=sr_ws, label='wind-speech original')
sound(data_ws_denoised, rate=sr_ws, label='wind-speech denoised')
sound(data_ws_denoised_smooth_3, rate=sr_ws, label='wind-speech denoised & smoothed 3')
sound(data_ws_denoised_smooth_5, rate=sr_ws, label='wind-speech denoised & smoothed 5')
print("aircomm:")
sound(data_ac, rate=sr_ac, label='aircomm original')
sound(data_ac_denoised, rate=sr_ac, label='aircomm denoised')
sound(data_ac_denoised_smooth_3, rate=sr_ac, label='aircomm denoised & smoothed 3')
sound(data_ac_denoised_smooth_5, rate=sr_ac, label='aircomm denoised & smoothed 5')
print("room speech:")
sound(data_rs, rate=sr_rs, label='room-speech original')
sound(data_rs_denoised, rate=sr_rs, label='room-speech denoised')
sound(data_rs_denoised_smooth_3, rate=sr_rs, label='room-speech denoised & smoothed 3')
sound(data_rs_denoised_smooth_5, rate=sr_rs, label='room-speech denoised & smoothed 5')
fig, (fig_spec_ws, fig_spec_ws_denoised, fig_spec_ws_denoised_smooth_3, fig_spec_ws_denoised_smooth_5,
fig_spec_ac, fig_spec_ac_denoised, fig_spec_ac_denoised_smooth_3, fig_spec_ac_denoised_smooth_5,
fig_spec_rs, fig_spec_rs_denoised, fig_spec_rs_denoised_smooth_3, fig_spec_rs_denoised_smooth_5) = plt.subplots(nrows=12)
show_graph(fig_spec_ws, spec_ws, len(data_ws), sr_ws, "spectrogram", "wind-speech origin")
show_graph(fig_spec_ws_denoised, spec_ws_denoised, len(data_ws), sr_ws, "spectrogram", "wind-speech denoised")
show_graph(fig_spec_ws_denoised_smooth_3, spec_ws_denoised_smooth_3, len(data_ws), sr_ws, "spectrogram", "wind-speech denoised & smoothed 3")
show_graph(fig_spec_ws_denoised_smooth_5, spec_ws_denoised_smooth_5, len(data_ws), sr_ws, "spectrogram", "wind-speech denoised & smoothed 5")
show_graph(fig_spec_ac, spec_ac, len(data_ac), sr_ac, "spectrogram", "aircomm origin")
show_graph(fig_spec_ac_denoised, spec_ac_denoised, len(data_ac), sr_ac, "spectrogram", "aircomm denoised")
show_graph(fig_spec_ac_denoised_smooth_3, spec_ac_denoised_smooth_3, len(data_ac), sr_ac, "spectrogram", "aircomm denoised & smoothed 3")
show_graph(fig_spec_ac_denoised_smooth_5, spec_ac_denoised_smooth_5, len(data_ac), sr_ac, "spectrogram", "aircomm denoised & smoothed 5")
show_graph(fig_spec_rs, spec_rs, len(data_rs), sr_rs, "spectrogram", "room-speech origin")
show_graph(fig_spec_rs_denoised, spec_rs_denoised, len(data_rs), sr_rs, "spectrogram", "room-speech denoised")
show_graph(fig_spec_rs_denoised_smooth_3, spec_rs_denoised_smooth_3, len(data_rs), sr_rs, "spectrogram", "room-speech denoised & smoothed 3")
show_graph(fig_spec_rs_denoised_smooth_5, spec_rs_denoised_smooth_5, len(data_rs), sr_rs, "spectrogram", "room-speech denoised & smoothed 5")
fig.set_size_inches(6, 48)
fig.tight_layout()
plt.show()
The easiest sample to be denoised is "room-speech" because it's background noise has less fluctuation than other two samples. "Wind-speech" has gradually increasing wind noise so some noise is still detectable at middle part after applying subtraction. The noise of "aircomm"has much bigger amplitude and short time fluctuation than the sound we want to extract, so the musical noise is very noticable. With applying median filter, set kernel size to 3 is high enough for "wind-speech" and "room-speech" to eliminate most musical noise except "aircomm", which should set the size as 5. But in this case, the sound we want to extract is seriously degraded, so we still cannot know what the person is talking about in "aircomm."
For the last sound we have an evolving noise profile, which causes problems since our noise description from the first two seconds isn’t accurate throughout. Because we’re lazy we want to automatically update the noise model and not to select it manually. To do so we need a VAD that lets us know when to gather noise statistics and when to denoise. Do the following:
def spec_dynamic_subtract(alpha, med_kern_size, speech_or_noise, spectrogram):
result = np.zeros(np.array(spectrogram).shape, dtype=complex)
multiplier = np.zeros(np.array(spectrogram).shape, dtype=float)
noise = []
for j in range(len(spectrogram)):
if speech_or_noise[j] == 0:
noise.append(spectrogram[j])
if len(noise) > 10:
noise.pop(0)
bg_amp = np.mean(np.abs(np.array(noise)), axis=0)
for i in range(len(spectrogram[j])):
multiplier[j][i] = max((abs(spectrogram[j][i]) - abs(bg_amp[i]) * alpha) / abs(spectrogram[j][i]), 0.0)
return spectrogram * np.add(signal.medfilt2d(multiplier, kernel_size=med_kern_size), 0.00000001)
spec_ws = stft(data_ws, dft_size, hop_size, zero_pad, window);
lowpass = signal.firwin(1024, 3000, fs=sr_ws)
squared_waveform = np.multiply(data_ws, data_ws)
filtered_data_ws = np.convolve(lowpass, squared_waveform, mode='same')
threshold = 0.02
speech_or_noise = []
for i in range(0, len(spec_ws)):
if max(np.concatenate([squared_waveform, np.array([0] * 512)])[(i * 128):(i * 128) + 512]) > threshold:
speech_or_noise.append(1)
else:
speech_or_noise.append(0)
spec_ws_denoised_dynamic = spec_dynamic_subtract(1.5, 1, speech_or_noise, spec_ws)
data_ws_denoised_dynamic = istft(spec_ws_denoised_dynamic, dft_size, hop_size, zero_pad, window)
sound(data_ws, rate=sr_ws, label='wind-speech original')
sound(data_ws_denoised, rate=sr_ws, label='wind-speech denoised static')
sound(data_ws_denoised_dynamic, rate=sr_ws, label='wind-speech denoised dynamic')
fig, (fig_energy, fig_spec_ws, fig_spec_static, fig_spec_dynamic) = plt.subplots(nrows=4)
fig_energy.plot(squared_waveform)
fig_energy.set_title("energy")
show_graph(fig_spec_ws, spec_ws, len(data_ws), sr_ws, "spectrogram", "wind-speech origin")
show_graph(fig_spec_static, spec_ws_denoised, len(data_ws), sr_ws, "spectrogram", "wind-speech denoised static")
show_graph(fig_spec_dynamic, spec_ws_denoised_dynamic, len(data_ws), sr_ws, "spectrogram", "wind-speech denoised dynamic")
fig.set_size_inches(6, 16)
fig.tight_layout()
plt.show()